package com.zhl.elasticsearch_demo.service.impl;

import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author zhl
 * @date 2019/9/16 0016 0:30
 */
@Service
public class ServiceImpl{

    Logger log = LoggerFactory.getLogger(this.getClass());

    @Autowired
    RestHighLevelClient client;


    /**
     * 添加1
     */
    public void add1(){
        IndexRequest request = new IndexRequest("1");
        request.id("2");
        String jsonString = "{" +
                "\"user\":\"kimchy\"," +
                "\"postDate\":\"2013-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
                "}";
        request.source(jsonString, XContentType.JSON);
        try {
            client.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    /**
     * 添加文档
     * @param obj
     */
    public String createDoc(Object obj){
        /**
         * 设置索引，必须是小写，所以使用toLowerCase()方法进行转换
         */
        IndexRequest request = new IndexRequest(obj.getClass().getSimpleName().toLowerCase());
        /**
         * 设置文档id。如果不设置，系统会自动生成
         */
        //request.id("xxxx");
        request.source(JSONObject.toJSONString(obj), XContentType.JSON);
        try {
            IndexResponse response = client.index(request, RequestOptions.DEFAULT);
            log.info(JSONObject.toJSONString(response));
            if(DocWriteResponse.Result.CREATED == response.getResult()){
                return response.getId();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return "";
    }

    /**
     * 批量处理文档
     * @param list 需要处理的文档列表
     */
    public void createMultiDoc(List<Object> list){
        BulkRequest request = new BulkRequest();
        /**
         * 遍历
         * IndexRequest不设置id，让系统自己添加
         * DeleteRequest 批量删除
         * UpdateRequest 批量更新
         */
        list.forEach((obj)->{
            request.add(new IndexRequest(obj.getClass().getSimpleName().toLowerCase())
                    .source(JSONObject.toJSONString(obj), XContentType.JSON));
        });
        try {
            BulkResponse bulkResponse = client.bulk(request,RequestOptions.DEFAULT);
            log.info(JSONObject.toJSONString(bulkResponse));
            bulkResponse.forEach((BulkItemResponse bulkItemResponse)->{
                DocWriteResponse itemResponse = bulkItemResponse.getResponse();
                switch (bulkItemResponse.getOpType()) {
                    case INDEX:
                    case CREATE:
                        IndexResponse indexResponse = (IndexResponse) itemResponse;
                        log.info(JSONObject.toJSONString(indexResponse));
                        if(DocWriteResponse.Result.CREATED == indexResponse.getResult()){
                            log.info("添加成功");
                        }
                        break;
                    case UPDATE:
                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                        break;
                    case DELETE:
                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        break;
                    default:break;
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 更新文档
     * @param obj
     * @param id
     */
    public Boolean updateDoc(Object obj, String id){
        /**
         * 设置索引，必须是小写，所以使用toLowerCase()方法进行转换
         */
        UpdateRequest request = new UpdateRequest(obj.getClass().getSimpleName().toLowerCase(), id);
        request.doc(JSONObject.toJSONString(obj), XContentType.JSON);
        try {
            UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
            updateResponse.getGetResult();
            if(DocWriteResponse.Result.UPDATED == updateResponse.getResult()){
                return true;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }


    /**
     * 批量更新文档
     * @param list
     */
    public void updateMultiDoc(List<HashMap<String,Object>> list){
        BulkRequest request = new BulkRequest();
        /**
         * 遍历
         * IndexRequest不设置id，让系统自己添加
         * DeleteRequest 批量删除
         * UpdateRequest 批量更新
         */
        list.forEach((map)->{
            Object obj = map.get("obj");
            request.add(
                    new UpdateRequest(obj.getClass().getSimpleName().toLowerCase(), (String) map.get("id"))
                            .doc(JSONObject.toJSONString(obj), XContentType.JSON)
            );
        });
        try {
            BulkResponse bulkResponse = client.bulk(request,RequestOptions.DEFAULT);
            bulkResponse.forEach((BulkItemResponse bulkItemResponse)->{
                DocWriteResponse itemResponse = bulkItemResponse.getResponse();
                switch (bulkItemResponse.getOpType()) {
                    case INDEX:
                    case CREATE:
                        IndexResponse indexResponse = (IndexResponse) itemResponse;
                        break;
                    case UPDATE:
                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                        log.info(JSONObject.toJSONString(updateResponse));
                        if(DocWriteResponse.Result.UPDATED == updateResponse.getResult()){
                            log.info("更新成功");
                        }
                        break;
                    case DELETE:
                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        break;
                    default:break;
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 批量删除文档
     * @param list
     */
    public void deleteMultiDoc(String index, List<String> list){
        BulkRequest request = new BulkRequest();
        /**
         * 遍历
         * IndexRequest不设置id，让系统自己添加
         * DeleteRequest 批量删除
         * UpdateRequest 批量更新
         */
        list.forEach((id)->{
            request.add(new DeleteRequest(index.toLowerCase(), id));
        });
        try {
            BulkResponse bulkResponse = client.bulk(request,RequestOptions.DEFAULT);
            bulkResponse.forEach((BulkItemResponse bulkItemResponse)->{
                DocWriteResponse itemResponse = bulkItemResponse.getResponse();
                switch (bulkItemResponse.getOpType()) {
                    case INDEX:
                    case CREATE:
                        IndexResponse indexResponse = (IndexResponse) itemResponse;
                        break;
                    case UPDATE:
                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                        break;
                    case DELETE:
                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        log.info(JSONObject.toJSONString(deleteResponse));
                        if(DocWriteResponse.Result.UPDATED == deleteResponse.getResult()){
                            log.info("删除成功");
                        }
                        break;
                    default:break;
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 索引是否存在
     * @param indeces
     */
    public Boolean exist(String indeces){
        GetIndexRequest request = new GetIndexRequest(indeces.toLowerCase());
        Boolean exist = null;
        try {
            exist = client.indices().exists(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return exist;
    }


    /**
     * 删除索引
     * @param indeces
     */
    public void delete(String indeces){
        DeleteIndexRequest request = new DeleteIndexRequest(indeces.toLowerCase());
        try {
            AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
            boolean acknowledged = deleteIndexResponse.isAcknowledged();
            log.info("删除索引：{}", acknowledged);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     *
     * @param clasz 要查询的类
     * @param value 搜索内容
     * @param start 开始下标
     * @param size 搜索范围
     * @param fieldName 搜索字段
     * @return
     */
    public SearchHit[] searchDoc(Class clasz, String value, int start, int size, String... fieldName){

        SearchRequest searchRequest;
        if (clasz == null) {
            searchRequest = new SearchRequest();
        }else{
            searchRequest = new SearchRequest(clasz.getSimpleName().toLowerCase());
        }
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        log.info("fieldName.length:{},fieldName:{}",fieldName.length,fieldName);
        QueryBuilder queryBuilder = QueryBuilders.multiMatchQuery(value, fieldName);
        searchSourceBuilder.query(queryBuilder);
        searchSourceBuilder.from(start);
        searchSourceBuilder.size(size);
        searchRequest.source(searchSourceBuilder);
        return search2(searchRequest);

    }

    /**
     * 使用_id查询文档
     * @param indeces
     * @param _id
     */
    public void getById(String indeces, String _id){
        SearchRequest searchRequest = new SearchRequest(indeces.toLowerCase());
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        QueryBuilder qb = QueryBuilders.matchQuery("_id",_id);
        searchSourceBuilder.query(qb);
        searchSourceBuilder.timeout(TimeValue.timeValueSeconds(60));
        searchRequest.source(searchSourceBuilder);
        search(searchRequest);
    }

    /**
     * 使用id查询文档
     * @param clasz 类
     * @param id 查询id
     */
    public SearchHit[] findById(Class clasz, String id){
        SearchRequest searchRequest = new SearchRequest(clasz.getSimpleName().toLowerCase());
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        QueryBuilder qb = QueryBuilders.matchQuery("_id",id);
        searchSourceBuilder.query(qb);
        searchSourceBuilder.timeout(TimeValue.timeValueSeconds(60));
        searchRequest.source(searchSourceBuilder);
        return search2(searchRequest);
    }

    /**
     * 删除文档
     * @param indeces 索引
     * @param delete_id 要删除的id
     */
    public void deleteById(String indeces, String delete_id){
        DeleteRequest request = new DeleteRequest(indeces.toLowerCase(), delete_id);
        // 等待主分片可用的超时时间
        request.timeout(TimeValue.timeValueMinutes(10));
        DeleteResponse deleteResponse = null;
        try {
            deleteResponse = client.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        String index = deleteResponse.getIndex();
        String id = deleteResponse.getId();
        long version = deleteResponse.getVersion();
        log.info("index:" + index + "; id:" + id + ",version:" + version);
        //判断删除的文档是否存在 ：
        if(deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND){
            log.error("未找到需要删除的文档!");
            return;
        }
        ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
        if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
            log.error("未完全执行所有分片,总分片数为：" + shardInfo.getTotal() + ",执行的分片数为："+ shardInfo.getSuccessful());
        }
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                String reason = failure.reason();
                log.error("失败原因：" + reason);
                return;
            }
        }
    }


    public void search(SearchRequest searchRequest){
        try {
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

            RestStatus status = searchResponse.status();
            TimeValue took = searchResponse.getTook();
            Boolean terminatedEarly = searchResponse.isTerminatedEarly();
            boolean timedOut = searchResponse.isTimedOut();
            log.info("测试statusindex:{}",status);
            log.info("测试took:{}",took);
            log.info("测试terminatedEarly:{}",terminatedEarly);
            log.info("测试timedOut:{}",timedOut);

            SearchHits hits = searchResponse.getHits();
            SearchHit[] searchHits = hits.getHits();
            for (SearchHit hit : searchHits) {
                // do something with the SearchHit
                String index = hit.getIndex();
                String id = hit.getId();
                float score = hit.getScore();
                long version = hit.getVersion();

                log.info("测试index:{}",index);
                log.info("测试id:{}",id);
                log.info("测试score:{}",score);
                log.info("测试version:{}",version);
                String sourceAsString = hit.getSourceAsString();
                log.info("测试sourceAsString:{}",sourceAsString);
                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                String name = (String) sourceAsMap.get("name");
                log.info("测试name:{}",name);
                log.info("测试hit json:{}",JSONObject.toJSONString(hit));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public SearchHit[] search2(SearchRequest searchRequest){
        try {
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            SearchHits hits = searchResponse.getHits();
            SearchHit[] searchHits = hits.getHits();
            return searchHits;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 创建索引，使用ik分词插件
     * 按照实体类实际字段，手动编写索引的mapping
     */
    public void createIndex(String index_name){

        try {
            CreateIndexRequest index = new CreateIndexRequest(index_name);
            Map<String,Object> properties = new HashMap();
            Map<String,Object> propertie = new HashMap();
            propertie.put("type","text");
            propertie.put("index",true);
            propertie.put("analyzer","ik_max_word");
            properties.put("field_name",propertie);
            XContentBuilder builder = JsonXContent.contentBuilder();
            builder.startObject()
                        .startObject("mappings")
                                .startObject("properties")
                                    .startObject("desc")
                                        .field("type","text")
                                        .field("index",true)
                                        .field("analyzer","ik_max_word")
                                    .endObject()
                                .endObject()

                        .endObject()
                        .startObject("settings")
                            .field("number_of_shards",3)
                            .field("number_of_replicas",1)
                        .endObject()
                    .endObject();

            index.source(builder);
            client.indices().create(index,RequestOptions.DEFAULT);

        } catch (IOException e) {
            e.printStackTrace();

        }
    }

    /**
     * 创建索引
     * 将实体类的String类型的字段添加ik中文分词，其他字段在实际添加数据后会按类型自动添加
     * @param clasz
     * @return
     */
    public Boolean createIndex2(Class clasz){
        try {
            CreateIndexRequest index = new CreateIndexRequest(clasz.getSimpleName().toLowerCase());

            XContentBuilder builder = JsonXContent.contentBuilder();
            builder.startObject()
                    .startObject("mappings")
                    .startObject("properties");
            Field[] fields = clasz.getDeclaredFields();
            for(Field field : fields){
                Class<?> type = field.getType();
                //这里只对String类型的字段添加ik中文分词处理
                if(type.getSimpleName().equals("String")){
                    builder.startObject(field.getName().toLowerCase())
                                .field("type","text")
                                .field("index",true)
                                .field("analyzer","ik_max_word")
                            .endObject();
                }
            }
            builder.endObject().endObject();
            builder.startObject("settings")
                    //分片数
                    .field("number_of_shards",1)
                    //副本数,1台机器设为0
                    .field("number_of_replicas",0)
                    .endObject()
                    .endObject();

            index.source(builder);
            CreateIndexResponse response = client.indices().create(index,RequestOptions.DEFAULT);
            log.info(JSONObject.toJSONString(response));
            return response.isAcknowledged();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }


}
